package org.zjvis.datascience.service.csv;

import cn.hutool.db.Entity;
import io.swagger.models.auth.In;
import lombok.Data;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zjvis.datascience.common.constant.DatasetConstant;
import org.zjvis.datascience.common.constant.FileConstant;
import org.zjvis.datascience.common.dto.FileUploadRequestDTO;
import org.zjvis.datascience.common.exception.BaseErrorCode;
import org.zjvis.datascience.common.exception.DataScienceException;
import org.zjvis.datascience.common.util.RedisUtil;
import org.zjvis.datascience.common.vo.dataset.HeadVO;
import org.zjvis.datascience.common.vo.dataset.PreviewDatasetVO;
import org.zjvis.datascience.service.csv.dto.ErrorInfo;
import org.zjvis.datascience.service.csv.dto.ParseInfo;
import org.zjvis.datascience.service.csv.slice.CsvReader;
import scala.Int;

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;

/**
 * @description ParseFile 文件分块解析
 * @date 2021-12-29
 */
@Data
public class ParseFile implements Callable<ParseInfo> {
    private final static Logger logger = LoggerFactory.getLogger("ParseFile");

    private File file;
    private int currentBlock = 1;
    private File dstfile;
    private boolean isUTF8;
    private String charSet;
    private long blockSize;
    private FileChannel fileChannel;
    private File partFile;
    private CsvSemiotic csvSemiotic;
    private RedisUtil redisUtil;
    private FileUploadRequestDTO fileUploadRequestDTO;
    private List<Entity> previewData;
    private boolean needHeader;
    private long expireTime;
    private String sliceFolderPath;
    private List<String> headers;
    String formatName;

    /**
     * 必定是从未写过才执行
     * @param file 源文件
     * @param currentBlock 当前块
     * @param blockSize 固定的大小
     */
    public ParseFile(FileUploadRequestDTO fileUploadRequestDTO, File file, int currentBlock, String charSet,
                     long blockSize, FileChannel fileChannel, CsvSemiotic csvSemiotic, RedisUtil redisUtil, boolean needHeader, long expireTime,
                     String sliceFolderPath, List<String> headers, String formatName) {
        this.file = file;
        this.currentBlock = currentBlock;
        this.charSet = charSet;
        this.blockSize = blockSize;
        this.fileChannel = fileChannel;
        this.csvSemiotic = csvSemiotic;
        this.redisUtil = redisUtil;
        this.fileUploadRequestDTO = fileUploadRequestDTO;
        this.needHeader = needHeader;
        this.expireTime = expireTime;
        this.sliceFolderPath = sliceFolderPath;
        this.headers = headers;
        this.formatName = formatName;
    }

    @Override
    public ParseInfo call() {
        InputStreamReader isr = null;
        BufferedReader br = null;
        try {
            int lineCnt = 0;
            boolean isFileWithN = false;
            if (fileUploadRequestDTO.getTotalChunks() > 1) {
                RandomAccessFile raf = new RandomAccessFile(file, "rw");
                long l = file.length();
                raf.seek(l - 1);
                int i = raf.read(); //10 or 13
                raf.close();
                if (i == 10 || i == 13) {
                    isFileWithN = true;
                }
                try (LineNumberReader lineNumberReader = new LineNumberReader(new FileReader(file))){
                    lineNumberReader.skip(Long.MAX_VALUE);
                    lineCnt = lineNumberReader.getLineNumber();
                    lineCnt += 1; //实际上是读取换行符数量 , 所以需要+1
                    if (isFileWithN) lineCnt += 1;
                } catch (IOException e) {
                    logger.error("读取行数失败: ", e);
                    return null;
                }
            }
            if ("utf-8".equals(charSet) || "Unicode".equals(charSet)) {
                isr = new InputStreamReader(new FileInputStream(file), "UTF-8");
            } else {
                isr = new InputStreamReader(new FileInputStream(file), "GB2312");
            }
            br = new BufferedReader(isr);

            CsvReader csvReader = null;
            List<HeadVO> heads = null;
            int cnt = 0;
            if (headers != null) heads = PreviewDatasetVO.toHead(headers, needHeader);
            else {
                logger.error("获取标题失败");
                throw DataScienceException.of(BaseErrorCode.SLICE_UPLOAD_FILE_WRITE_FAIL, "获取标题失败");
            }
            Integer maxShow = 5000 / fileUploadRequestDTO.getTotalChunks() < 0 ? 1:5000 / fileUploadRequestDTO.getTotalChunks();
            csvReader = new CsvReader(br, currentBlock == 1, csvSemiotic, lineCnt, currentBlock - 1, fileUploadRequestDTO.getTotalChunks(), isFileWithN, headers.size(), maxShow);
            previewData = new LinkedList<>();
            int needRows = DatasetConstant.DATA_TYPE_CHECK_SIZE / fileUploadRequestDTO.getTotalChunks();
            while (csvReader.getState() != State.enddocument) {
                List<String> result = csvReader.readNext(false);
                if (result == null || result.size() == 0) continue;
                cnt++;
                if (cnt < needRows) {
                    Entity jo = new Entity();
                    for (int i = 0; i < headers.size(); i++) {
                        if (i >= result.size()) {
                            jo.set(heads.get(i).getName(), null);
                        } else {
                            jo.set(heads.get(i).getName(), result.get(i));
                        }
                    }
                    previewData.add(jo);
                }
                String res = "\r\n" + String.join(csvReader.getSeparate(), result);
                // 获取缓冲区
                ByteBuffer buf = ByteBuffer.allocateDirect(res.getBytes(StandardCharsets.UTF_8).length);
                buf.clear();
                // 存入数据到缓冲区
                buf.put(res.getBytes(StandardCharsets.UTF_8));
                buf.flip(); // 切换到读数据模式
                while(buf.hasRemaining()) {
                    fileChannel.write(buf);
                }
            }
            // 如果，第一行不是header， 应该作为数据添加，且不改变字段名
            if (currentBlock == 1 && !needHeader) {
                Entity firstLine = new Entity();
                for (int i = 0; i < heads.size(); i++) {
                    firstLine.set(heads.get(i).getName(), headers.get(i));
                }
                previewData.add(0, firstLine);
            }

            if (fileUploadRequestDTO.getTotalChunks() > 1) {
                redisUtil.hset(FileConstant.FILE_CHUNK_SEG_INFO + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                        (currentBlock - 1) + "_end",
                        csvReader.getEndRow());
                if (currentBlock != 1) {
                    redisUtil.hset(FileConstant.FILE_CHUNK_SEG_INFO + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName,
                            (currentBlock - 1) + "_begin",
                            csvReader.getBeginRow());
                }
            }
            // 对该块进行解析, 假存储信息，防止后续堵塞
            redisUtil.lSet(FileConstant.FILE_UPLOAD_PREVIEW_DATA + "_" + fileUploadRequestDTO.getIdentifier() + "_" + formatName + "_"
                    + (currentBlock-1), previewData, expireTime);
            setAndCheckConfFile(fileUploadRequestDTO);
            ErrorInfo errorInfo = csvReader.geterrorLineInfo();
            errorInfo.setSuccessCount(cnt);
            return ParseInfo.builder()
                    .errorInfo(errorInfo)
                    .build();
        } catch (Exception e) {
            logger.error("ParseFile call: " + e);
            return null;
        } finally {
            // 将进度加入redis
            redisUtil.hincr(FileConstant.FILE_CHECK_PROGRESS, fileUploadRequestDTO.getIdentifier() + "_" + formatName, 1);
            if (br != null) {
                try {
                    br.close();
                } catch (IOException e) {
                    logger.error("ParseFile call: " + e);
                }
            }

            if (isr != null) {
                try {
                    isr.close();
                } catch (IOException e) {
                    logger.error("ParseFile call: " + e);
                }
            }
        }
    }

    protected boolean setAndCheckConfFile(FileUploadRequestDTO fileUploadRequestDTO) throws DataScienceException {
        try {
            // 配置文件
            File confFile = new File(sliceFolderPath + File.separator + fileUploadRequestDTO.getIdentifier() + File.separator +formatName,
                    fileUploadRequestDTO.getIdentifier() + ".conf");
            RandomAccessFile randomAccessFile = new RandomAccessFile(confFile, "rw");
            randomAccessFile.setLength(fileUploadRequestDTO.getTotalChunks());
            randomAccessFile.seek(currentBlock - 1);
            randomAccessFile.write(Byte.MAX_VALUE);
            randomAccessFile.close();
            // check
            byte[] completeList = FileUtils.readFileToByteArray(confFile);
            byte isComplete = 0;
            isComplete = Byte.MAX_VALUE;
            for (int i = 0; i < completeList.length && isComplete == Byte.MAX_VALUE; i++) {
                //与运算, 如果有部分没有完成则 isComplete 不是 Byte.MAX_VALUE
                isComplete = (byte) (isComplete & completeList[i]);
            }

            // 针对是否完成操作
            if (isComplete == Byte.MAX_VALUE) {
                redisUtil.hset(FileConstant.FILE_UPLOAD_COMPLETE, fileUploadRequestDTO.getIdentifier() + "_" + formatName, true, expireTime);
                confFile.delete();
                return true;
            } else if (!redisUtil.hHasKey(FileConstant.FILE_UPLOAD_COMPLETE, fileUploadRequestDTO.getIdentifier() + "_" + formatName)) {
                redisUtil.hset(FileConstant.FILE_UPLOAD_COMPLETE, fileUploadRequestDTO.getIdentifier() + "_" + formatName, false, expireTime);
            }
            return false;
        } catch (FileNotFoundException e) {
            logger.error("slice upload setAndCheckConfFile faild, FileNotFoundException: ", e);
            throw DataScienceException.of(BaseErrorCode.SLICE_UPLOAD_FILE_NOT_FOUND, "服务器中未找到conf文件");
        } catch (IOException e) {
            logger.error("slice upload setAndCheckConfFile faild, IOException: ", e);
            throw DataScienceException.of(BaseErrorCode.SLICE_UPLOAD_FILE_WRITE_FAIL, "写conf文件失败");
        }
    }
}
